[SYSTEMDS-3949] Add native Delta Lake frame read/write via Delta Kernel#2515
Open
Baunsgaard wants to merge 10 commits into
Open
[SYSTEMDS-3949] Add native Delta Lake frame read/write via Delta Kernel#2515Baunsgaard wants to merge 10 commits into
Baunsgaard wants to merge 10 commits into
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2515 +/- ##
============================================
+ Coverage 71.56% 71.69% +0.13%
- Complexity 49110 49446 +336
============================================
Files 1575 1583 +8
Lines 189793 190908 +1115
Branches 37235 37436 +201
============================================
+ Hits 135816 136864 +1048
- Misses 43480 43498 +18
- Partials 10497 10546 +49 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
27fe2ff to
d269ee7
Compare
Extend the native Delta Lake support from matrices to frames, reading and writing Delta Lake tables through the Spark-free Delta Kernel library on the single-node CP path. DML read/write with format="delta" now works for frames, discovering schema, column names, and dimensions directly from the table. - Add FrameReaderDelta, FrameReaderDeltaParallel and FrameWriterDelta - Wire DELTA into the frame reader and writer factories - Refresh cached frame metadata and schema after a Delta read - Broaden Delta frame component IO coverage Stacked on the matrix Delta support; append/overwrite semantics, distributed execution, and time travel remain out of scope.
d269ee7 to
8feb495
Compare
The native Delta read decode is CPU-bound and parallelizes per data file, so a table written as one large file cannot use more than one reader thread. Size data files toward roughly one file per expected parallel reader, capped by the configured target and floored to avoid tiny-file proliferation. This materially improves parallel-read throughput for both matrix and frame tables. - Add the sysds.io.delta.writer.adaptivefilesize config (default true) plus adaptiveWriterTargetFileSize/createWriteEngine helpers in DeltaKernelUtils, and document the target file size as an upper bound - Wire FrameWriterDelta and WriterDelta to size files from the block's estimated bytes (dense double footprint for matrices) - Use the configurable DELTA_WRITER_BATCH_SIZE in FrameWriterDelta instead of a hardcoded batch size, matching the matrix writer
The parallel frame reader's metadata-direct path wrote each data file's rows into shared per-column arrays at a fixed offset without bounding the row count, so a table whose per-file numRecords statistic under-counts the actual rows (possible for externally written Delta tables) could overrun its slice into the next file's region under concurrent writes. - Add the per-file row-count overflow guard in FrameReaderDeltaParallel .readDirect, matching the matrix reader: fail fast with a clear message instead of risking overlapping concurrent writes or an array overrun - Reuse DeltaKernelUtils.typeCode/T_* in FrameReaderDelta instead of a forked R_* table and instanceof cascade, keeping the frame and matrix type dispatch in lockstep; drop the now-unused type imports - Extract awaitFileTasks in FrameReaderDeltaParallel to share the pool lifecycle across both read paths and restore the interrupt flag when a parallel read is cancelled - Add a unit test covering the adaptive target-file-size flag on/off and the floor/cap clamp boundaries - Clarify the adaptive-size javadoc floor wording, the createWriteEngine batch-size comment, and rename opaque locals (names2, bcs/bss)
The single-threaded frame reader extracted each batch into temporary per-batch arrays and then concatenated them into the final column arrays, allocating and copying every column twice. Delta's per-file numRecords statistic already provides exact row counts from metadata, so the output can be sized up front and decoded in a single pass with no intermediate buffers. - Rewrite FrameReaderDelta.readFrameFromHDFS to pre-size one typed array per column from the metadata row counts and decode each data file straight into its row offset (with the same per-file overflow guard as the parallel reader); fall back to the buffered extract-then-concatenate path only when exact counts are unavailable (missing statistics or deletion vectors present) - Move allocColumn/createColumn/extractColumnInto up to FrameReaderDelta so the serial and parallel readers share one copy instead of duplicating the per-type column dispatch - The parallel reader's single-file/low-thread fallback now also decodes in a single direct pass
Route the Delta frame readers' column allocation and wrapping through ArrayFactory instead of reader-local per-type switches: - Add ArrayFactory.allocateBacking(ValueType,int) as the single ValueType -> raw backing array mapping (the inverse of create(ValueType,Object)), and remove the duplicate allocColumn / createColumn / extractColumn / buildColumn switches from FrameReaderDelta and FrameReaderDeltaParallel. The buffered fallback now reuses the same alloc + extractColumnInto + concatColumn primitives as the direct path. - Make create(ValueType,Object) bit-pack boolean columns above the switch point into a BitSetArray (mirroring allocateBoolean), so Delta reads produce the same compact representation as every other frame reader instead of a byte-backed BooleanArray. - Simplify allocate(ValueType,int) to compose create + allocateBacking, keeping only the boolean special case (empty BitSet backing) and moving the UINT4/UINT8 fallback warning into allocateBacking. - Move useDirectPath to FrameReaderDelta so both readers share it.
Add timing/throughput benchmarks for the serial and parallel Delta frame readers: - DeltaFrameRead in the performance suite (dispatched as Main id 18): writes a random frame to a temp Delta table once as untimed setup, then repeatedly reads it back under timing for serial/parallel/both modes with a configurable writer target file size. Suitable for running under async-profiler. - DeltaFrameReadPerf: JUnit-based manual micro-benchmarks (all @ignore so they never run in the normal build) covering direct-vs-buffered serial reads, adaptive file sizing, target-size and batch-size sweeps, and schema-composition breakdowns.
Reformat the new Delta frame reader/writer and test files with the project Eclipse style (dev/CodeStyle_eclipse.xml). Whitespace and layout only, no behavioral changes.
- Replace the bespoke genMixedFrame generator in DeltaFrameReadWriteTest with TestUtils.generateRandomFrameBlock, removing duplicated random frame generation code. - Add DeltaFrameSparkInteropTest exercising cross-engine round-trips against the reference Delta Spark connector: SystemDS-written multi-file frame read by Spark, Spark-written multi-file frame read by the serial and parallel SystemDS readers, and a Spark table with deletion vectors read via the buffered selection-mask path. Comparisons are keyed by a unique id column so no row order is assumed.
- Fail loud when a data file decodes fewer rows than its numRecords statistic (silent underflow) in both the serial and parallel direct read paths, alongside the existing overflow guard, and compute the selected-row count once per batch. - Extract a shared ReadPlan (read codes, value types, names) and a readWithHandle entry point so the schema derivation lives in one place and the parallel reader reuses its already-opened scan handle for the single-file fallback instead of re-opening the snapshot. - Move the null-result check above the metadata refresh in FrameObject so it can actually fire before data is dereferenced. - Log the adaptive writer file-size decision at debug level and document the INT64 boxing and null precondition on the frame writer. - Consolidate duplicated countParquet test logic into DeltaFrameTestUtils, add a loan-pattern helper for multi-file tables, assert on exception messages, and use assertEquals for per-cell comparisons. - Remove the redundant DeltaFrameReadPerf micro-benchmark.
…elUtils The prior commit ran the Eclipse formatter over the entire files, reflowing many lines this PR does not touch. Restore both files to a minimal diff that carries only the intended changes: the null-result check moved above the metadata refresh in FrameObject, and the debug log for the adaptive writer file-size decision in DeltaKernelUtils.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Extend the native Delta Lake support (#2511) from matrices to frames, reading and
writing Delta Lake tables through the Spark-free Delta Kernel library on the
single-node CP path. DML read/write with
format="delta"now works for frames,discovering schema, column names, and dimensions directly from the table.
Stacked on #2511 and should merge after it. Append/overwrite semantics, distributed
execution, and time travel remain out of scope.
Frame read/write
FrameReaderDelta/FrameWriterDelta: column-at-a-time materialization into typedframe
Arrays (no per-cell boxing on the read path), wired intoFrameReader/WriterFactoryforformat="delta".FrameReaderDeltaParallel: decodes one task per parquet data file on the commonthread pool (the parquet decode dominates a Delta read), falling back to the serial
reader for single-file tables. Selected iff parallel CP read is enabled.
the per-file
numRecordsstatistics and decodes straight into each row offset, and abuffered fallback (per-batch decode + concatenate) for tables without exact row
counts or with deletion vectors. Both guard against files that decode more or fewer
rows than their statistic claims.
FrameObjectnow refreshes the cached metadata/schema after a Delta read (dimensionsand schema are discovered at read time) and counts the HDFS write for statistics.
Adaptive writer file sizing
sysds.delta.writer.adaptive.filesize(defaulttrue): the writer targetsroughly one data file per expected parallel reader — clamped to the configured
sysds.delta.writer.targetfilesizecap and a 4MB floor — so the per-file parallelread can use all threads. Applies to both the matrix (
WriterDelta) and frame writersvia shared helpers in
DeltaKernelUtils; set the flag tofalsefor a fixed target.Shared column allocation
ArrayFactorygainsallocateBacking(ValueType, int)and makescreate(ValueType, Object)the single place that wraps a raw backing array into a frameArray,bit-packing large boolean columns into
BitSetArray(consistent with other readers)and mapping
UINT4/UINT8toINT32. The Delta readers reuse these primitivesinstead of duplicating per-type allocation logic.
Tests
DeltaFrameReadWriteTest) covering mixed/large/multi-batchframes, string nulls, empty tables, short/byte coercion, non-mappable-type rejection,
and serial-vs-parallel / direct-vs-buffered equivalence across multi-file tables.
DeltaFrameSparkInteropTest): SystemDS↔Spark round-trips,multi-file reads, and Spark-written tables with deletion vectors.
FrameDeltaReadWriteTest).@Ignored read benchmarks (DeltaFrameReadPerf,performance/DeltaFrameRead)for profiling the read path; not run in CI.